Supporting Variable.set in execution time with task SDK#49005
Merged
amoghrajesh merged 6 commits intoapache:mainfrom Apr 10, 2025
Merged
Supporting Variable.set in execution time with task SDK#49005amoghrajesh merged 6 commits intoapache:mainfrom
amoghrajesh merged 6 commits intoapache:mainfrom
Conversation
Contributor
Author
|
Draft at this stage but working on fixing the tests and testing various things. |
ashb
reviewed
Apr 9, 2025
kaxil
reviewed
Apr 9, 2025
kaxil
approved these changes
Apr 9, 2025
hussein-awala
approved these changes
Apr 9, 2025
Contributor
|
Just tested again and as noted in #47920 I still see my integration test DAG failing :-( |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why?
Variable.set is a common pattern that is used and in the past it would use the database and session to do so. Adding a similar mechanism in the task sdk execution time variable is needed to be able to set variables using the new task sdk API server mechanism.
What?
I have more of less tried to port the behaviour from here: https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/variable.py#L183-L215
And this also includes porting the write conflict check. I am handling thus for the worker side.
This means that if we are trying to write a variable to the metadata DB but same variable exists in the secrets backend, this will throw a warning.
Testing
DAG:
DAG:
Testing with a workers secret backend defined
Setting up a workers secrets backend as
LocalFilesystemBackendwith this env:var.json:
Verified the setup:
DAG:
This dag first prints the value of the variable present in the secrets backend.
Task1:

Task 2 trying to override it:

However the DB has it:

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.